feat(2152): rolling ParquetWriter for streaming writes (constant memory + spec-correct file sizes)#3336
Draft
paultmathew wants to merge 3 commits intoapache:mainfrom
Draft
feat(2152): rolling ParquetWriter for streaming writes (constant memory + spec-correct file sizes)#3336paultmathew wants to merge 3 commits intoapache:mainfrom
paultmathew wants to merge 3 commits intoapache:mainfrom
Conversation
added 3 commits
May 6, 2026 23:18
Currently `Table.append(df)` and `Table.overwrite(df)` only accept a materialised `pa.Table`, which forces callers to load the entire dataset into memory before writing. This makes pyiceberg unusable for large or unbounded inputs and has been a recurring complaint (apache#1004, apache#2152, dlt-hub#3753). Allow `pa.RecordBatchReader` as an alternative input. When a reader is provided, batches are streamed and microbatched into target-sized Parquet files via the new `bin_pack_record_batches` helper, then committed in a single snapshot via the existing fast_append path. Memory is bounded by `write.target-file-size-bytes` (default 512 MiB) per worker rather than the full input size. Scope of this PR — unpartitioned tables only. Streaming into partitioned tables raises NotImplementedError pointing back to apache#2152; partitioned support needs additional design (high-cardinality partition handling, per-partition rolling writers) and is tracked as a follow-up. Mirrors iceberg-go#369's staging — that project shipped unpartitioned streaming first. Internal note: the implementation buffers up to `target_file_size` of in- memory RecordBatches before flushing to a Parquet file. A more memory- efficient rolling-ParquetWriter approach is a planned follow-up that will benefit from the `OutputStream.tell()` API added in apache#2998.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Rationale for this change
Builds on #3335. Please land that one first.
PR #3335 added
pa.RecordBatchReaderas a valid input toTable.append/Table.overwriteusing a buffered bin-pack approach. That implementation has two acknowledged caveats called out in its docstrings:N_workers × write.target-file-size-bytes(~4 GiB at defaults) — better than "materialise everything" but not constant.write.target-file-size-bytesis interpreted as uncompressed in-memory Arrow bytes via thebin_pack_record_batcheshelper, not on-disk compressed Parquet bytes. Resulting files are typically 3-10× smaller than the property suggests.This PR replaces the buffered approach with a rolling
pq.ParquetWriterdriven byOutputStream.tell()(added in #2998 specifically for this purpose). Both caveats go away:What this delivers:
tell()reports compressed on-disk bytes pyarrow has emitted to the stream, sowrite.target-file-size-bytesfinally means what the spec says it means — matching the Java/Spark/Flink writers.target_file_size: peak RSS is bounded by one input batch + the Parquet page buffer (~1 MiB × columns) + the S3 multipart upload pool (~5 MiB × ~8 in-flight parts). On a real S3 stack that's tens to a few hundred MiB, regardless of file size, dataset size, or number of files produced. See benchmark below.RecordBatchbecomes one Parquet row group, withwrite.parquet.row-group-limitenforced as a per-row-group cap — identical treatment to the materialisedpa.Tablewrite path.tx.append(reader)/tx.overwrite(reader). Internals only.bin_pack_record_batchesis removed (no longer needed). Its 4 unit tests are removed; the streaming behaviour is covered end-to-end by tests below.Memory profile
Streamed 1,000 batches × 5,000 rows × 108 bytes per row (≈ 515 MiB uncompressed, 390 MiB on disk after zstd of random alphanumeric payload, 24 files written at
write.target-file-size-bytes = 16 MiB) against AWS Glue + S3 (Aircall staging). Process RSS sampled at 19 Hz from a background thread. Detailed analysis below.The key observation: after the initial ramp during the first file, RSS oscillates within a ~30 MiB band across all 24 file rollovers and shows no growth from start to finish. Memory is bounded by the in-flight
RecordBatch+ Parquet page buffer + multipart upload pool — independent oftarget_file_size, dataset size, or number of files produced. The repository's previous buffered approach (#3335) held up totarget_file_size × N_workersof uncompressed Arrow buffers (≈ 4 GiB at defaults) — roughly 70× higher peak memory at default property settings.Throughput / parallelism
Streaming writes are sequential — one rolling file at a time. Single-stream throughput is bounded by the underlying multipart upload pool (~8 concurrent S3 PUTs in pyarrow.fs.S3FileSystem), which saturates typical network links and is rarely the bottleneck for streaming pipelines (where the upstream source — DB cursor, API, queue — is the limit). Callers wanting maximum write throughput (backfills, dataset migrations) can materialise as
pa.Tableand usetx.append(pa.Table), which keeps the existingexecutor.map-based file-level parallelism for the materialised path completely unchanged by this PR.A hybrid worker pool for the streaming path (N concurrent rolling writers fed by a queue) is a possible follow-up if real workloads show streaming write throughput as a bottleneck. Mirrors iceberg-go's roadmap, which has shipped single-writer-only streaming since April 2025 (iceberg-go#369) without follow-up demand.
Properties honored
The streaming path honors the same parquet writer properties as the materialised
pa.Tablepath:write.target-file-size-bytesOutputStream.tell(), on-disk compressed byteswrite.parquet.compression-codec/compression-levelwrite.parquet.page-size-byteswrite.parquet.page-row-limitwrite.parquet.dict-size-byteswrite.parquet.row-group-limit(pyiceberg-internal)write.parquet.row-group-size-bytes_get_parquet_writer_kwargsfor both paths; out of scope herewrite.parquet.bloom-filter-*When someone fixes
write.parquet.row-group-size-bytesfor pyiceberg, both write paths benefit. PR2 deliberately doesn't touch this since the gap predates this PR series.Code duplication note
_record_batches_to_data_filesshares some boilerplate with the materialisedwrite_file's nestedwrite_parquetclosure: parquet-writer-kwargs / row-group-size / location-provider extraction,file_schemaselection, andDataFileconstruction from Parquet metadata. The shared module-level helpers (_get_parquet_writer_kwargs,_to_requested_schema,data_file_statistics_from_parquet_metadata, etc.) are reused, but the "compose these helpers in the standard pattern" wrapper lives independently in each path.Extraction is mechanical (~100 lines of pure refactor) but I'd prefer to land it as a standalone follow-up PR — it touches the existing
write_fileclosure which I'd rather not modify in the same PR as the new streaming implementation.Are these changes tested?
Yes, at four layers.
1. End-to-end behaviour tests (no Docker)
tests/catalog/test_catalog_behaviors.py— 10 tests parametrised across all three in-process catalog backends (memory,sql,sql_without_rowcount) → 30 test runs:test_append_record_batch_reader— basic append round-trip.test_append_record_batch_reader_microbatched— multi-file rollover viatarget-file-size-bytes=1.test_append_record_batch_reader_row_group_limit_is_cap— feeds a single 1,000-row batch, setsrow-group-limit=250, asserts the resulting Parquet has exactly 4 row groups of 250 rows each (verified by reading the Parquet footer withpq.read_metadata).test_append_record_batch_reader_target_file_size_is_on_disk_bytes— setstarget-file-size-bytes=32 KiB, streams ~12 MiB, asserts each rolled file is between 0.5× and 5× the target. Catches regression to the old uncompressed-Arrow-bytes behaviour (which would produce files ~3-10× smaller than target).test_append_record_batch_reader_empty— empty reader produces zero data files.test_overwrite_record_batch_reader— overwrite via reader replaces existing rows.test_append_record_batch_reader_to_partitioned_table_raises— partitioned-table input raisesNotImplementedError.test_append_invalid_input_type_raises— non-Arrow input rejected.test_record_batch_reader_consumed_exactly_once— reader generator drained once; no double-pass regression.test_record_batch_reader_schema_mismatch_writes_no_files— schema mismatch fails before any data files are written (no orphan files).2. Spark integration tests
tests/integration/test_writes/test_writes.py— 6 tests (× v1, v2 format versions) proving Spark can read tables written via the streaming path against the docker-compose stack:test_append_record_batch_reader[1, 2]test_overwrite_record_batch_reader[1, 2]test_append_record_batch_reader_multifile[1, 2]3. Local CI sweep
make test(full unit suite): 3,650 passed, 0 failed, lint + mypy + pydocstyle clean.make test-integration(full Spark integration suite on fresh docker-compose): 396 passed, 1 skipped, 0 failed in 3:47.4. Real-stack smoke test on AWS
Verified end-to-end against AWS Glue + S3 in our staging account:
smoke_test_athena_readback.py): streamed 50,000 rows viatx.append(reader), then queried via Athena →COUNT(*) = 50000✓,MAX(id) = 49999✓. Confirms the Glue catalog metadata, Iceberg manifest, and Parquet footer produced by the rolling writer are valid from an external query engine's perspective.Are there any user-facing changes?
Effectively none beyond what #3335 already introduced — this PR changes internals only:
write.target-file-size-bytessemantics tighten for streaming inputs. A user who set this property on a streaming-write workflow under feat(2152): support pa.RecordBatchReader in Table.append/overwrite #3335 was getting files 3-10× smaller than configured (uncompressed Arrow bytes proxy). With this PR the property now reflects actual on-disk compressed bytes — files become correspondingly larger. This is a net win and matches the spec, but worth noting for anyone who calibrated batch sizes around the old behaviour.bin_pack_record_batcheshelper removed frompyiceberg.io.pyarrow. It was added in feat(2152): support pa.RecordBatchReader in Table.append/overwrite #3335 (so it's never been in a release) and its only consumer was_dataframe_to_data_files's streaming branch, which is now restructured.The public
Table.append(reader)/Table.overwrite(reader)API and its docstring guarantees are unchanged.Related
OutputStream.tell()from feat: Add tell() to OutputStream writers #2998 (already merged)write.target-file-size-bytes